package com.atuguigu.flink.sensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class MapExample {
    public static void main(String[] args) throws  Exception{
        //环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<SendsorReading> stream = env.addSource(new SensorSource());

        //map只获取id
        //匿名函数
        stream.map(
                new MapFunction<SendsorReading, String>() {
                    @Override
                    public String map(SendsorReading sendsorReading) throws Exception {
                        return sendsorReading.id;
                    }
                }
        ).print();

       //更好的实现方式
        stream.map(
                new IdExtracto()
        ).print();

        //flatmaps实现
        stream.flatMap(
                new FlatMapFunction<SendsorReading, String>() {

                    @Override
                    public void flatMap(SendsorReading sendsorReading, Collector<String> collector) throws Exception {
                        collector.collect(sendsorReading.id);
                    }
                }
        ).print();

        env.execute();

    }

    public static class IdExtracto implements MapFunction<SendsorReading,String>{


        @Override
        public String map(SendsorReading sendsorReading) throws Exception {
            return sendsorReading.id;
        }
    }
}
